package com.lzx.hbh_system.util;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

@Slf4j
@Component
public class SseUtil {
    @Autowired
    private RedisUtils redisUtils;
    private static SseUtil sseUtil;

    @PostConstruct
    public void init(){
        sseUtil = this;
        redisUtils = this.redisUtils;
    }
    /**
     * 当前连接数
     */
    private static AtomicInteger count = new AtomicInteger(0);

    /**
     * 使用map对象，便于根据userId来获取对应的SseEmitter，或者放redis里面
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
//    private static Map<String, SseEmitter> sseEmitterMap = new HashMap<>();
    public SseUtil() {
    }

    /**
     * 创建用户连接并返回 SseEmitter
     *
     * @param userId 用户ID或唯一字段
     * @return SseEmitter
     */
    public static SseEmitter connect(String userId) {
        // 设置超时时间，0表示不过期。默认30秒，超过时间未完成会抛出异常：AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        // 数量+1
        count.getAndIncrement();
        //用户客户端连接成功后添加进Redis存储或者更新
        // 存储或更新sseEmitterMap
        sseUtil.redisUtils.set("sseEmitterMap", JSONObject.toJSONString(sseEmitterMap));
        // 存储或更新连接客户端用户数量
        sseUtil.redisUtils.set("connetCount",count);
        log.info("创建新的sse连接，当前用户：{}", userId);
        return sseEmitter;
    }

    /**
     * 给指定用户发送信息
     * 问题1记录：redis不能很好的存放Map new ConcurrentHashMap<>(); ，问题2 客户端连接上了，可是没有调用massage的函数经行消息接收
     * 客户端报错：ERR_INCOMPLETE_CHUNKED_ENCODING 200 (OK)
     * 问题解决记录：controller使用了@rescontroller导致返回问
     * 解决时间：2022-2-7
     * 问题2记录：连接超时，删除客户端记录时，后台报错
     */
    public static void sendMessage(String userId, String message) {
        System.out.println(1111);
        Map<Object, Object> RedissseEmitterMap = new HashMap<>();
            try {
                log.info("向客户端用户名为：["+userId+"],推送数据：["+message+"],推送时间：["+TurnDateToStringUtil.getTime()+"]");
                //判断redis是否存储在，不在则再从声明静态map中获取
//                RedissseEmitterMap = sseUtil.redisUtils.getObject("sseEmitterMap") !=null ? (Map<String, SseEmitter>) sseUtil.redisUtils.getObject("sseEmitterMap") : sseEmitterMap;
//                // sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
//                //

                System.out.println(sseUtil.redisUtils.get("sseEmitterMap"));
                sseEmitterMap.get(userId).send(SseEmitter.event().name("message").data(message));
                System.out.println(111);
//                RedissseEmitterMap.get(userId).send(message);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", userId, e.getMessage());
                removeUser(userId);
            }
    }

    /**
     * 群发消息
     */
    public static void batchSendMessage(String wsInfo, List<String> ids) {
        ids.forEach(userId -> sendMessage(wsInfo, userId));
    }

    /**
     * 群发所有人
     */
    public static void batchSendMessage(String wsInfo) {
        Map<String, SseEmitter> RedissseEmitterMap = sseUtil.redisUtils.get("sseEmitterMap") !=null ? (Map<String, SseEmitter>) sseUtil.redisUtils.getObject("sseEmitterMap") : sseEmitterMap;
        RedissseEmitterMap.forEach((k, v) -> {
            try {
                v.send(wsInfo, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("用户[{}]推送异常:{}", k, e.getMessage());
                removeUser(k);
            }
        });
    }

    /**
     * 移除用户连接
     */
    public static void removeUser(String userId) {
//        Map<String, SseEmitter> RedissseEmitterMap = sseUtil.redisUtils.getObject("sseEmitterMap") !=null ? (Map<String, SseEmitter>) sseUtil.redisUtils.getObject("sseEmitterMap") : sseEmitterMap;

//        RedissseEmitterMap.remove(userId);
        sseEmitterMap.remove(userId);
        // 数量-1
        count.getAndDecrement();
        //用户客户端连接成功后添加进Redis存储或者更新
/*        // 存储或更新sseEmitterMap
        sseUtil.redisUtils.set("sseEmitterMap",RedissseEmitterMap);
        // 存储或更新连接客户端用户数量
        sseUtil.redisUtils.set("connetCount",count);*/
        log.info("移除用户：{}", userId);
    }

    /**
     * 获取当前连接信息
     */
    public static List<String> getIds() {
        Map<String, SseEmitter> RedissseEmitterMap = null;
        RedissseEmitterMap = (Map<String, SseEmitter>) sseUtil.redisUtils.getObject("sseEmitterMap");
        return RedissseEmitterMap != null ? new ArrayList<>(RedissseEmitterMap.keySet()) : new ArrayList<>(sseEmitterMap.keySet());
    }

    /**
     * 获取当前连接数量
     */
    public static int getUserCount() {
        Integer redisConnectUserCount = null;
        redisConnectUserCount = (Integer) sseUtil.redisUtils.getObject("connetCount");
        return redisConnectUserCount != null ? redisConnectUserCount : count.intValue();
    }

    private static Runnable completionCallBack(String userId) {
        return () -> {
            log.info("结束连接：{}", userId);
            removeUser(userId);
        };
    }

    private static Runnable timeoutCallBack(String userId) {
        return () -> {
            log.info("连接超时：{}", userId);
            removeUser(userId);
        };
    }

    private static Consumer<Throwable> errorCallBack(String userId) {
        return throwable -> {
            log.info("连接异常：{}", userId);
            removeUser(userId);
        };
    }
}
